---
title: Sensors | Dagster
description: Sensors allow you to instigate runs based on any external state change.
---

# Sensors

Sensors allow you to instigate runs based on any external state change.

## Relevant APIs

| Name                                    | Description                                                                                                                                                                                                     |
| --------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| <PyObject object="sensor" decorator />  | The decorator used to define a sensor. The decorated function is called the `evaluation_fn`. The decorator returns a <PyObject object="SensorDefinition" />                                                     |
| <PyObject object="RunRequest" />        | The sensor evaluation function can yield one or more run requests. Each run request creates a pipeline run.                                                                                                     |
| <PyObject object="SkipReason" />        | If a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed.                                    |
| <PyObject object="SensorDefinition"  /> | Base class for sensors. You almost never want to use initialize this class directly. Instead, you should use the <PyObject object="sensor" decorator /> which returns a <PyObject object="SensorDefinition"  /> |

## Overview

Sensors are definitions in Dagster that allow you to instigate runs based on some external state change automatically. For example, you can:

- Launch a run whenever a file appears in an s3 bucket
- Launch a run whenever another pipeline materializes a specific asset
- Launch a run whenever an external system is down

Sensors have several important properties:

- Each sensor targets a specific pipeline
- A sensor optionally defines tags, a mode, and a solid selection for the targeted pipeline.
- A sensor defines an evaluation function that returns either:
  - One or more <PyObject object="RunRequest"/> objects. Each run request launches a run.
  - An optional <PyObject object="SkipReason"/>, which specifies a message which describes why no runs were requested.

The [Dagster Daemon](/deployment/dagster-daemon) runs each sensor evaluation function on a tight loop. If you are using sensors, make sure to follow the instructions on the [DagsterDaemon](/deployment/dagster-daemon) page to run your sensors.

## Defining a sensor

To define a sensor, use the <PyObject object="sensor" decorator /> decorator. The decorated function is called the `execution_fn` and must have `context` as the first argument. The context is a <PyObject object="SensorExecutionContext" />

Let's say you have a pipeline that logs a filename that is specified in the solid configuration of the `process_file` solid:

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_sensor_pipeline_marker endbefore=end_sensor_pipeline_marker
from dagster import solid, pipeline


@solid(config_schema={"filename": str})
def process_file(context):
    filename = context.solid_config["filename"]
    context.log.info(filename)


@pipeline
def log_file_pipeline():
    process_file()
```

You can write a sensor that watches for new files in a specific directory and `yields` a `RunRequest` for each new file in the directory. By default, this sensor every 30 seconds.

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_directory_sensor_marker endbefore=end_directory_sensor_marker
import os
from dagster import sensor, RunRequest


@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor(_context):
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
            )
```

This sensor iterates through all the files in `MY_DIRECTORY` and `yields` a <PyObject object="RunRequest"/> for each file.

## Idempotence and Cursors

When instigating runs based on external events, you usually want to run exactly one pipeline run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: using `run_key` and using a cursor.

### Idempotence using run keys

In the example sensor above, the <PyObject object="RunRequest"/> is constructed with a `run_key`.

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_run_request_marker endbefore=end_run_request_marker
yield RunRequest(
        run_key=filename,
        run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
    )
```

Dagster guarantees that for a given sensor, at most one run is created for each <PyObject object="RunRequest"/> with a unique `run_key`. If a sensor yields a new run request with a previously used `run_key`, Dagster skips processing the new run request.

In the example, a <PyObject object="RunRequest"/> is requested for each file during _every_ sensor evaluation. Therefore, for a given sensor evaluation, there already exists a `RunRequest` with a `run_key` for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file.

Run keys allow you to write sensor evaluation functions that declaratively describe what pipeline runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary.

### Sensor optimizations using cursors

When writing a sensor that deals with high-volume events, it might not be feasible to `yield` a <PyObject object="RunRequest"/> during every sensor evaluation. For example, you may have an `s3` storage bucket that contains thousands of files.

When writing a sensor for such event sources, you can use run keys to maintain a cursor that limits the number of yielded run requests for previously processed events. The sensor API provides the last evaluated run key to serve as this cursor:

- `last_run_key`: A cursor field on <PyObject object="SensorExecutionContext"/> that returns the run key of the last run requested by a previous sensor evaluation.

Here is an example of our directory file sensor using `last_run_key` as a cursor for updated files.

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_cursor_sensors_marker endbefore=end_cursor_sensors_marker
def build_run_key(filename, mtime):
    return f"{filename}:{str(mtime)}"


def parse_run_key(run_key):
    parts = run_key.split(":")
    return parts[0], float(parts[1])


@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor_cursor(context):
    last_mtime = parse_run_key(context.last_run_key)[1] if context.last_run_key else None

    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            fstats = os.stat(filepath)
            file_mtime = fstats.st_mtime
            if file_mtime > last_mtime:
                # the run key should include mtime if we want to kick off new runs based on file modifications
                run_key = build_run_key(filename, file_mtime)
                run_config = ({"solids": {"process_file": {"config": {"filename": filename}}}},)
                yield RunRequest(run_key=run_key, run_config=run_config)
```

## Evaluation Interval

By default, the Dagster Daemon runs a sensor 30 seconds after the previous sensor evaluation finishes executing. You can configure the interval using the `minimum_interval_seconds` argument on the <PyObject object="sensor" decorator/> decorator.

It's important to note that this interval represents a minimum interval _between_ runs of the sensor and not the exact frequency the sensor runs. If you have a sensor that takes 2 minutes to complete, but the `minimum_interval_seconds` is 5 seconds, the fastest Dagster Daemon will run the sensor is every 2 minutes and 5 seconds. The `minimum_interval_seconds` only guarantees that the sensor is not evaluated more frequently than the given interval.

For example, here are two sensors that specify two different minimum intervals:

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_interval_sensors_maker endbefore=end_interval_sensors_maker
@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=30)
def sensor_A(_context):
    yield RunRequest(run_key=None, run_config={})


@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=45)
def sensor_B(_context):
    yield RunRequest(run_key=None, run_config={})
```

These sensor definitions are short, so they run in less than a second. Therefore, you can expect these sensors to run consistently around every 30 and 45 seconds, respectively.

## Skipping sensor evaluations

For debugging purposes, it is often useful to describe why a sensor might not yield any runs for a given evaluation. The sensor evaluation function can yield a <PyObject object="SkipReason" /> with a string description that will be displayed in Dagit.

For example, here is our directory sensor that now provides a SkipReason when no files are encountered:

```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_skip_sensors_marker endbefore=end_skip_sensors_marker
@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor_with_skip_reasons(_context):
    has_files = False
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
            )
            has_files = True
    if not has_files:
        yield SkipReason(f"No files found in {MY_DIRECTORY}.")
```

## Testing sensors

To quickly preview what an existing sensor would generate when evaluated, you can run the CLI command `dagster sensor preview my_sensor_name`.

## Monitoring sensors in Dagit

<!--
    These images were generated by running

    ```
    cd dagster/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/sensors/
    dagit -f sensors.py
    ```
 -->

You can monitor and operate sensors in Dagit. There are multiple views that help with observing sensor evaluations, skip reasons, and errors.

To view the sensors page, click the "All sensors" in the left-hand navigation pane. Here you can turn sensors on and off using the toggle.

![All Sensors](/images/concepts/partitions-schedules-sensors/sensors/all-sensors.png)

If you click on any sensor, you can monitor all sensor evaluations and runs created:

![Sensor A](/images/concepts/partitions-schedules-sensors/sensors/sensor-A.png)

If your sensor throws an error or yields a skip reason, the sensor timeline view will display more information about the errors and skips:

![My Directory Sensor](/images/concepts/partitions-schedules-sensors/sensors/my-directory-sensor.png)

## Examples

### Asset sensors

A useful pattern is to create a sensor that checks for new <PyObject object="AssetMaterialization" /> events for a particular asset key. This can be used to kick off a pipeline that computes downstream assets or notifies appropriate stakeholders.

One benefit of this pattern is that it enables cross-pipeline and even cross-repository dependencies. Each pipeline run instigated by an asset sensor is agnostic to the pipeline event that caused it.

Here is an example of a sensor that listens for asset materializations for a given asset key `my_table`:

```python file=/concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_asset_sensors_marker endbefore=end_asset_sensors_marker
from dagster import AssetKey


@sensor(pipeline_name="my_pipeline")
def my_asset_sensor(context):
    events = context.instance.events_for_asset_key(
        AssetKey("my_table"), after_cursor=context.last_run_key, ascending=False, limit=1
    )
    if events:
        record_id, event = events[0]  # take the most recent materialization
        yield RunRequest(
            run_key=str(record_id), run_config={}, tags={"source_pipeline": event.pipeline_name}
        )
```

### Pipeline failure sensor

If you want to act on pipeline failures, e.g., you need to send an alert to a monitoring service on pipeline failure. You can write a sensor that monitors Dagster's runs table and launches a specialized "alert" pipeline for each failed run.

For example, you can write an "alert" pipeline that sends a slack message when it runs. Note that the pipeline depends on a `slack` resource:

```python file=/concepts/partitions_schedules_sensors/sensors/sensor_alert.py startafter=start_alert_pipeline_marker endbefore=end_alert_pipeline_marker
@solid(required_resource_keys={"slack"})
def slack_message_on_failure_solid(context):
    message = f"Solid {context.solid.name} failed"
    context.resources.slack.chat.post_message(channel="#foo", text=message)


@pipeline(
    mode_defs=[
        ModeDefinition(name="test", resource_defs={"slack": ResourceDefinition.mock_resource()}),
        ModeDefinition(name="prod", resource_defs={"slack": slack_resource}),
    ]
)
def failure_alert_pipeline():
    slack_message_on_failure_solid()
```

Then, you can define a sensor that fetches the failed runs from the runs table via `context.instance`, and instigates a `failure_alert_pipeline` run for every failed run. Note that we use the failed run's id as the `run_key` to prevent sending an alert twice for the same pipeline run.

```python file=/concepts/partitions_schedules_sensors/sensors/sensor_alert.py startafter=start_alert_sensor_marker endbefore=end_alert_sensor_marker
@sensor(pipeline_name="failure_alert_pipeline", mode="prod")
def pipeline_failure_sensor(context):
    runs = context.instance.get_runs(
        filters=PipelineRunsFilter(
            pipeline_name="your_pipeline_name",
            statuses=[PipelineRunStatus.FAILURE],
        ),
    )
    for run in runs:
        # Use the id of the failed run as run_key to avoid duplicate alerts.
        yield RunRequest(run_key=str(run.run_id))
```

If you would like to set up success or failure handling policies on solids, you can find more information on the [Solid Hooks](/concepts/solids-pipelines/solid-hooks) page.

### S3 sensors

For pipelines that should initiate new runs for new paths in an s3 bucket, the `dagster-aws` package provides the useful helper function `get_s3_keys`.

Here is an example of a sensor that listens to a particular s3 bucket `my_s3_bucket`:

```python file=/concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_s3_sensors_marker endbefore=end_s3_sensors_marker
from dagster_aws.s3.sensor import get_s3_keys


@sensor(pipeline_name="my_pipeline")
def my_s3_sensor(context):
    new_s3_keys = get_s3_keys("my_s3_bucket", since_key=context.last_run_key)
    if not new_s3_keys:
        yield SkipReason("No new s3 files found for bucket my_s3_bucket.")
        return
    for s3_key in new_s3_keys:
        yield RunRequest(run_key=s3_key, run_config={})
```
